{ "cells": [ { "cell_type": "markdown", "id": "f4ccfb71", "metadata": {}, "source": [ "# Parallelizing Multiple Grid Cells on a High-Performance Computing Cluster" ] }, { "cell_type": "markdown", "id": "1f35a372", "metadata": {}, "source": [ "Building off the previous tutorial, this notebook demonstrates how to use the [Dask PBSCluster](https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.PBSCluster.html) object to parallelize MUSICA simulations across HPC systems. While MUSICA is locally efficient for simulations up to around 10,000 grid cells, scaling to larger domains can benefit from parallelizing grid cell calculations to improve runtime performance. This tutorial walks through best practices for setting up and running parallel MUSICA workflows, with concrete examples and reference scaling tests performed on [NCAR’s Casper HPC system](https://ncar-hpc-docs.readthedocs.io/en/latest/compute-systems/casper/)." ] }, { "cell_type": "markdown", "id": "3adde55e", "metadata": {}, "source": [ "## 1. Importing Libraries" ] }, { "cell_type": "markdown", "id": "35845be0", "metadata": {}, "source": [ "In addition to libraries previously used throughout MUSICA tutorials, this tutorial uses Dask. Note that if you'd like to visualize Dask graphs, you will also need [Graphviz](https://graphviz.org) installed." ] }, { "cell_type": "code", "execution_count": null, "id": "47021626", "metadata": {}, "outputs": [], "source": [ "#import libraries\n", "import musica\n", "import musica.mechanism_configuration as mc\n", "import matplotlib.pyplot as plt\n", "import pandas as pd\n", "import numpy as np\n", "from dask import delayed, compute\n", "from dask.distributed import Client\n", "from dask_jobqueue import PBSCluster\n", "import numpy as np\n", "import time\n", "from scipy.stats import qmc\n", "import seaborn as sns" ] }, { "cell_type": "markdown", "id": "49c68e4d-d477-447e-83fc-08008147e478", "metadata": {}, "source": [ "## 2. Dask Cluster Set up" ] }, { "cell_type": "markdown", "id": "95d2d7b8", "metadata": {}, "source": [ "We first need to set up a Dask Cluster object. In this tutorial, we use the PBSCluster class since our HPC system uses a PBS-based scheduler, but Dask also provides an equivalent [SLURMCluster](https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html) class for SLURM-based systems. When initializing the cluster, you’ll notice it accepts many arguments that may look familiar from your system’s job scripts such as requested memory, number of cores, and walltime. Keep in mind that these values should be adapted to fit your particular workload and system constraints. As a general guideline, we find MultiGrid cell simulations under ~10,000 cells to run comfortably with about 8 GB of memory, while larger simulations ranging from 100,000 to 1,000,000 grid cells may need closer to 15 GB to run efficiently. It can be helpful to initialize the PBSCluster with extra memory as done below (10 GB)." ] }, { "cell_type": "code", "execution_count": null, "id": "edc588b5-6a65-48f2-b3a6-437bcdf066c5", "metadata": {}, "outputs": [], "source": [ "#casper\n", "cluster = PBSCluster(\n", " job_name = 'dask-test',\n", " cores = 1,\n", " memory = '10GiB',\n", " processes = 1,\n", " local_directory = '/local_scratch/pbs.$PBS_JOBID/dask/spill',\n", " resource_spec = 'select=1:ncpus=1:mem=10GB', #memory and resource especially memory should match\n", " queue = 'casper',\n", " walltime = '50:00',\n", " interface = 'ext'\n", ")" ] }, { "cell_type": "markdown", "id": "bb06180d", "metadata": {}, "source": [ "Prior to running your simulation, it can be helfpul to check the active Dask PBS configuration and resulting job scrip that will be used for your Dask workers." ] }, { "cell_type": "code", "execution_count": null, "id": "8b7564e9-2401-489d-a09e-9915a26ddf26", "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "{'name': 'dask-worker',\n", " 'cores': None,\n", " 'memory': None,\n", " 'processes': None,\n", " 'python': None,\n", " 'interface': None,\n", " 'death-timeout': 60,\n", " 'local-directory': None,\n", " 'shared-temp-directory': None,\n", " 'extra': None,\n", " 'worker-command': 'distributed.cli.dask_worker',\n", " 'worker-extra-args': [],\n", " 'shebang': '#!/usr/bin/env bash',\n", " 'queue': None,\n", " 'account': None,\n", " 'walltime': '00:30:00',\n", " 'env-extra': None,\n", " 'job-script-prologue': [],\n", " 'resource-spec': None,\n", " 'job-extra': None,\n", " 'job-extra-directives': [],\n", " 'job-directives-skip': [],\n", " 'log-directory': None,\n", " 'scheduler-options': {}}" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "#view configuration file(s) within Python\n", "from dask import config\n", "config.refresh()\n", "config.get('jobqueue.pbs')" ] }, { "cell_type": "code", "execution_count": 4, "id": "5a02e45d-7e69-4cc6-8ffb-47f49d52a2d5", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "#!/usr/bin/env bash\n", "\n", "#PBS -N dask-test\n", "#PBS -q casper\n", "#PBS -A NTDD0005\n", "#PBS -l select=1:ncpus=1:mem=10GB\n", "#PBS -l walltime=50:00\n", "\n", "/glade/work/apak/conda-envs/musicbox/bin/python -m distributed.cli.dask_worker tcp://128.117.208.119:36453 --name dummy-name --nthreads 1 --memory-limit 10.00GiB --nanny --death-timeout 60 --local-directory /local_scratch/pbs.$PBS_JOBID/dask/spill --interface ext\n", "\n" ] } ], "source": [ "print(cluster.job_script())" ] }, { "cell_type": "markdown", "id": "4bd22360", "metadata": {}, "source": [ "As in the [previous tutorial](4.%20local_parallelization.ipynb), the Dask Cluster provides a convenient and interactive dashboard to visualize your parallelization work." ] }, { "cell_type": "code", "execution_count": 5, "id": "9d16e1f9-608c-4c5e-84ca-83a118886299", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Client

\n", "

Client-05c11d79-6197-11f0-9aeb-ac1f6bab1e7a

\n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "
Connection method: Cluster objectCluster type: dask_jobqueue.PBSCluster
\n", " Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/apak/proxy/8787/status\n", "
\n", "\n", " \n", "\n", " \n", "
\n", "

Cluster Info

\n", "
\n", "
\n", "
\n", "
\n", "

PBSCluster

\n", "

0e43aec4

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/apak/proxy/8787/status\n", " \n", " Workers: 0\n", "
\n", " Total threads: 0\n", " \n", " Total memory: 0 B\n", "
\n", "\n", "
\n", " \n", "

Scheduler Info

\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", "

Scheduler

\n", "

Scheduler-31c8add4-0019-4525-a08f-f35e07e0a301

\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", " Comm: tcp://128.117.208.119:36453\n", " \n", " Workers: 0 \n", "
\n", " Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/apak/proxy/8787/status\n", " \n", " Total threads: 0\n", "
\n", " Started: Just now\n", " \n", " Total memory: 0 B\n", "
\n", "
\n", "
\n", "\n", "
\n", " \n", "

Workers

\n", "
\n", "\n", " \n", "\n", "
\n", "
\n", "\n", "
\n", "
\n", "
\n", "
\n", " \n", "\n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client = Client(cluster)\n", "client" ] }, { "cell_type": "markdown", "id": "1326c7c1", "metadata": {}, "source": [ "After your cluster is created, it needs to be scaled to a certain number of Dask workers. For this tutorial, we have used between 1-4 workers and the results of their performance are included below in section 7. Note that due to the nature of the MUSICA code not releasing Python's Global Interpreter Lock, it was not found to be particularly helpful for performance to increase threads." ] }, { "cell_type": "code", "execution_count": null, "id": "4dbfb7a5", "metadata": {}, "outputs": [], "source": [ "cluster.scale(2)\n", "client.wait_for_workers(2) # wait to launch jobs until all workers needed are available" ] }, { "cell_type": "code", "execution_count": 8, "id": "6db0af64-149b-4108-ac1e-a8d3b609a206", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'PBSCluster-1': ,\n", " 'PBSCluster-0': }" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# See the workers from the cluster object\n", "cluster.workers" ] }, { "cell_type": "code", "execution_count": 9, "id": "0b166006-757d-4bfb-a040-09254ddbc32a", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " Req'd Req'd Elap\n", "Job ID Username Queue Jobname SessID NDS TSK Memory Time S Time\n", "--------------- -------- -------- ---------- ------ --- --- ------ ----- - -----\n", "5630216.casper* apak jhublog* cr-login-* 72033 1 1 4gb 720:0 R 00:01\n", "5630218.casper* apak htc dask-test 103102 1 1 10gb 00:50 R 00:00\n", "5630219.casper* apak htc dask-test 103114 1 1 10gb 00:50 R 00:00\n" ] } ], "source": [ "# See the workers in the job scheduler\n", "!qstat -u $USER" ] }, { "cell_type": "markdown", "id": "578aadfe-0b44-419f-9b31-2af20c41fbdb", "metadata": {}, "source": [ "## 3. Setting up Grid Cells" ] }, { "cell_type": "markdown", "id": "ffdab444", "metadata": {}, "source": [ "We find parallelization for MUSICA simulations to increase performance on system sizes beyond ~10,000 grid cells. However, to keep simulation sizes sand batching reasonable, we use a 10,000 grid cell simulation here as an example. As in the local parallelization notebook, we first define the number of grid cells and set their initial conditions before initializing MUSICA chemistry objects." ] }, { "cell_type": "code", "execution_count": 87, "id": "bb849961-0b7d-4d76-b82a-48f5bc67cc39", "metadata": {}, "outputs": [], "source": [ "num_grid_cells = 10000" ] }, { "cell_type": "code", "execution_count": 28, "id": "32f84c85-f371-48ca-8d9b-70fec49c7dd5", "metadata": {}, "outputs": [], "source": [ "ndim = 5\n", "nsamples = num_grid_cells\n", "\n", "# Create a Latin Hypercube sampler in the unit hypercube\n", "sampler = qmc.LatinHypercube(d=ndim)\n", "\n", "# Generate samples\n", "sample = sampler.random(n=nsamples)\n", "\n", "# Define bounds for each dimension\n", "l_bounds = [275, 100753.3, 0, 0, 0] # Lower bounds\n", "u_bounds = [325, 101753.3, 10, 10, 10] # Upper bounds\n", "\n", "# Scale the samples to the defined bounds\n", "sample_scaled = qmc.scale(sample, l_bounds, u_bounds)" ] }, { "cell_type": "code", "execution_count": 86, "id": "7920bc07-b693-4837-9fa7-94d48da10206", "metadata": {}, "outputs": [], "source": [ "temperatures = sample_scaled[:, 0]\n", "pressures = sample_scaled[:, 1]\n", "concentrations = {\n", " \"A\": [],\n", " \"B\": [],\n", " \"C\": []\n", "}\n", "concentrations[\"A\"] = sample_scaled[:, 2]\n", "concentrations[\"B\"] = sample_scaled[:, 3]\n", "concentrations[\"C\"] = sample_scaled[:, 4]\n", "\n", "concentrations_solved = []\n", "time_step_length = 1\n", "sim_length = 60\n", "curr_time = 0" ] }, { "cell_type": "markdown", "id": "aaa3e946", "metadata": {}, "source": [ "## 4. Creating a Delayed Function for Dask" ] }, { "cell_type": "markdown", "id": "54f6bc1c", "metadata": {}, "source": [ "The following delayed function is the same as the one created for the previous [Local Parallelization Tutorial](4.%20local_parallelization.ipynb). However, here it is used to solve one grid cell at a time on an HPC system. Due to the low computational cost of solving single grid cells in MUSICA, this method is not recommended and is only used for scaling comparisons." ] }, { "cell_type": "code", "execution_count": null, "id": "b7946d7a", "metadata": {}, "outputs": [], "source": [ "@delayed\n", "def solve_one_cell(cell_index,temperatures,pressures,concentrations, sim_length, time_step):\n", "\n", " # Define the system\n", "\n", " A = mc.Species(name=\"A\") # Create each of the species with their respective names\n", " B = mc.Species(name=\"B\")\n", " C = mc.Species(name=\"C\")\n", " species = [A, B, C] # Bundle the species into a list\n", " gas = mc.Phase(name=\"gas\", species=species) # Create a gas phase object containing the species\n", "\n", " r1 = mc.Arrhenius( # Create the reactions with their name, constants, reactants, products, and phase\n", " name=\"A_to_B\",\n", " A=4.0e-3, # Pre-exponential factor\n", " C=50, # Activation energy (units assumed to be K)\n", " reactants=[A],\n", " products=[B],\n", " gas_phase=gas\n", " )\n", "\n", " r2 = mc.Arrhenius(\n", " name=\"B_to_C\",\n", " A=4.0e-3,\n", " C=50, \n", " reactants=[B],\n", " products=[C],\n", " gas_phase=gas\n", " )\n", "\n", " mechanism = mc.Mechanism( # Define the mechanism which contains a name, the species, the phases, and reactions\n", " name=\"musica_micm_example\",\n", " species=species,\n", " phases=[gas],\n", " reactions=[r1, r2]\n", ")\n", "\n", "\n", " #create the solver\n", " solver = musica.MICM(mechanism=mechanism, solver_type=musica.SolverType.rosenbrock_standard_order)\n", "\n", " #create the state\n", " state = solver.create_state(1)\n", " state.set_conditions(temperatures[cell_index],pressures[cell_index])\n", " cur_concentrations = {key: value[cell_index] for key, value in concentrations.items()}\n", " state.set_concentrations(cur_concentrations)\n", "\n", " time = 0.0\n", " result = []\n", " track_time = []\n", " while time <= sim_length:\n", " solver.solve(state, time)\n", " result.append(state.get_concentrations().copy())\n", " track_time.append(time)\n", " time += time_step\n", "\n", " return {\n", " \"times\": np.array(track_time),\n", " \"concentrations\": np.stack(result)\n", " }\n" ] }, { "cell_type": "code", "execution_count": null, "id": "a4e44461-d846-40f6-a0a0-def5f639e6fd", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "parallel 10000 cell finished in 60.77 seconds\n" ] } ], "source": [ "start = time.time()\n", "tasks = [solve_one_cell(i, temperatures,pressures,concentrations, sim_length, time_step_length) for i in range(num_grid_cells)]\n", "results = compute(*tasks)\n", "elapsed = time.time() - start\n", "print(f\"parallel 10000 cell finished in {elapsed:.2f} seconds\")" ] }, { "cell_type": "markdown", "id": "a7df1a69-1d3d-4bf2-a40e-021442ed5e90", "metadata": {}, "source": [ "## 5. Dask With Batching" ] }, { "cell_type": "markdown", "id": "9178f809", "metadata": {}, "source": [ "Since MUSICA already computes individual grid cells efficiently, the main performance gain from Dask comes from batching—grouping multiple grid cells together so each CPU core processes several cells at once. This allows us to fully use multiple cores while reducing overhead and improving throughput on large simulations." ] }, { "cell_type": "code", "execution_count": 49, "id": "2f9333c0-532a-4774-83f5-eca7c8a4ca07", "metadata": {}, "outputs": [], "source": [ "@delayed\n", "def solve_batch(start_idx, end_idx, temperatures, pressures, concentrations, sim_length, time_step):\n", " batch_results = []\n", " air_densities = [] #track for later visualization\n", " for cell_index in range(start_idx, end_idx):\n", " # build mechanism as before\n", " A = mc.Species(name=\"A\")\n", " B = mc.Species(name=\"B\")\n", " C = mc.Species(name=\"C\")\n", " species = [A, B, C]\n", " gas = mc.Phase(name=\"gas\", species=species)\n", "\n", " r1 = mc.Arrhenius(name=\"A_to_B\", A=4.0e-3, C=50, reactants=[A], products=[B], gas_phase=gas)\n", " r2 = mc.Arrhenius(name=\"B_to_C\", A=4.0e-3, C=50, reactants=[B], products=[C], gas_phase=gas)\n", "\n", " mechanism = mc.Mechanism(\n", " name=\"musica_micm_example\",\n", " species=species,\n", " phases=[gas],\n", " reactions=[r1, r2]\n", " )\n", " \n", " solver = musica.MICM(mechanism=mechanism, solver_type=musica.SolverType.rosenbrock_standard_order)\n", " state = solver.create_state(1)\n", " state.set_conditions(temperatures[cell_index], pressures[cell_index])\n", " air_density = state.get_conditions()['air_density']\n", " air_densities.append(air_density)\n", " cur_concentrations = {key: value[cell_index] for key, value in concentrations.items()}\n", " state.set_concentrations(cur_concentrations)\n", "\n", " time = 0.0\n", " result = []\n", " while time <= sim_length:\n", " solver.solve(state, time)\n", " result.append(state.get_concentrations().copy())\n", " time += time_step\n", "\n", " batch_results.append(np.stack(result))\n", "\n", " return np.stack(batch_results), np.array(air_densities) # shape: (num_cells_in_batch, num_timesteps, num_species)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "fb85f229-4b1d-405e-b0bd-1c9e07c36a4d", "metadata": {}, "outputs": [], "source": [ "batch_size = 100 #number of grid cells solved on a single worker\n", "\n", "tasks = []\n", "for start_idx in range(0, num_grid_cells, batch_size):\n", " end_idx = min(start_idx + batch_size, num_grid_cells)\n", " task = solve_batch(\n", " start_idx, end_idx,\n", " temperatures, pressures, concentrations,\n", " sim_length, time_step_length\n", " )\n", " tasks.append(task)\n" ] }, { "cell_type": "markdown", "id": "f980ee71", "metadata": {}, "source": [ "Upon solving this multiple grid cell calculation in batches, we see that it is much more efficient than it's individually run counter part in section 4." ] }, { "cell_type": "code", "execution_count": 51, "id": "0b6e5091-e5b2-4f96-9924-13e5528514c4", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "parallel 100000 cell batched finished in 9.87 seconds\n" ] } ], "source": [ "start = time.time()\n", "results_and_densities = compute(*tasks)\n", "elapsed = time.time() - start\n", "print(f\"parallel 100000 cell batched finished in {elapsed:.2f} seconds\")" ] }, { "cell_type": "markdown", "id": "c1a39625", "metadata": {}, "source": [ "## 6. Visualizing Results" ] }, { "cell_type": "markdown", "id": "f31aac4b", "metadata": {}, "source": [ "Similar to the [Latin Hypercube Sampling Tutorial](2.%20hypercube.ipynb), the following code prepares the results of the parallelized simulations into a dataframe that gives each time step of each grid cell its own row. The formatting is slightly different due to the Dask calculation returning arrays rather than lists and dictionaries directly but the overall organization is the same." ] }, { "cell_type": "code", "execution_count": null, "id": "b8ede01f-2e5d-4573-a75b-63f045db5ef2", "metadata": {}, "outputs": [], "source": [ "#split results and air densities back into their own arrays\n", "\n", "species_names = ['A', 'B', 'C']\n", "num_time_steps = sim_length // time_step_length + 1\n", "\n", "all_results = []\n", "all_air_densities = []\n", "\n", "for batch_result, air_densities in results_and_densities:\n", " all_results.append(batch_result) # shape: (batch_size, num_timesteps, num_species)\n", " all_air_densities.append(air_densities) # shape: (batch_size,)\n", "\n", "# Concatenate over grid cells\n", "all_results = np.concatenate(all_results, axis=0) # shape: (num_grid_cells, num_timesteps, num_species)\n", "all_air_densities = np.concatenate(all_air_densities, axis=0) # shape: (num_grid_cells,)" ] }, { "cell_type": "code", "execution_count": null, "id": "faa607d9-3e44-4e9a-a149-fc298cda680d", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
time.sENV.temperature.KENV.pressure.PaENV.air number density.mol m-3CONC.A.mol m-3CONC.B.mol m-3CONC.C.mol m-3
00301.626773101259.51656140.3767899.7681337.1421468.698319
10301.626773101259.51656140.3767895.7807539.5478878.133570
20301.626773101259.51656140.3767894.1839140.8645720.116974
30301.626773101259.51656140.3767890.3589648.1262276.105201
40301.626773101259.51656140.3767895.2338864.2997446.196699
........................
60999560301.626773101259.51656140.3767890.0012290.01145212.930998
60999660301.626773101259.51656140.3767890.0004210.00493318.148737
60999760301.626773101259.51656140.3767890.0001060.0020436.835295
60999860301.626773101259.51656140.3767890.0003700.00436513.947568
60999960301.626773101259.51656140.3767890.0005260.00477912.959798
\n", "

610000 rows × 7 columns

\n", "
" ], "text/plain": [ " time.s ENV.temperature.K ENV.pressure.Pa \\\n", "0 0 301.626773 101259.516561 \n", "1 0 301.626773 101259.516561 \n", "2 0 301.626773 101259.516561 \n", "3 0 301.626773 101259.516561 \n", "4 0 301.626773 101259.516561 \n", "... ... ... ... \n", "609995 60 301.626773 101259.516561 \n", "609996 60 301.626773 101259.516561 \n", "609997 60 301.626773 101259.516561 \n", "609998 60 301.626773 101259.516561 \n", "609999 60 301.626773 101259.516561 \n", "\n", " ENV.air number density.mol m-3 CONC.A.mol m-3 CONC.B.mol m-3 \\\n", "0 40.376789 9.768133 7.142146 \n", "1 40.376789 5.780753 9.547887 \n", "2 40.376789 4.183914 0.864572 \n", "3 40.376789 0.358964 8.126227 \n", "4 40.376789 5.233886 4.299744 \n", "... ... ... ... \n", "609995 40.376789 0.001229 0.011452 \n", "609996 40.376789 0.000421 0.004933 \n", "609997 40.376789 0.000106 0.002043 \n", "609998 40.376789 0.000370 0.004365 \n", "609999 40.376789 0.000526 0.004779 \n", "\n", " CONC.C.mol m-3 \n", "0 8.698319 \n", "1 8.133570 \n", "2 0.116974 \n", "3 6.105201 \n", "4 6.196699 \n", "... ... \n", "609995 12.930998 \n", "609996 18.148737 \n", "609997 6.835295 \n", "609998 13.947568 \n", "609999 12.959798 \n", "\n", "[610000 rows x 7 columns]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "concentrations_expanded = []\n", "time = []\n", "\n", "for t in range(num_time_steps):\n", " current_time = t * time_step_length\n", " for g in range(num_grid_cells):\n", "\n", " # all_results[g][t] is dict with species concentrations\n", "\n", " # Extract the float value from each species list\n", " concentrations_expanded.append({cur_time: v[0] if isinstance(v, (list, np.ndarray)) and len(v) == 1 else v for k, v in all_results[g][t].items()})\n", "\n", " time.append(current_time)\n", "\n", "df_expanded = pd.DataFrame(concentrations_expanded)\n", "df_expanded = df_expanded.rename(columns={\n", " 'A': 'CONC.A.mol m-3', \n", " 'B': 'CONC.B.mol m-3', \n", " 'C': 'CONC.C.mol m-3'\n", "})\n", "df_expanded['time.s'] = time\n", "\n", "# Add environment columns same as before:\n", "num_rows = num_time_steps * num_grid_cells\n", "df_expanded['ENV.temperature.K'] = np.repeat(temperatures[0], num_rows)\n", "df_expanded['ENV.pressure.Pa'] = np.repeat(pressures[0], num_rows)\n", "df_expanded['ENV.air number density.mol m-3'] = np.repeat(all_air_densities[0], num_rows)\n", "\n", "df_expanded = df_expanded[[\n", " 'time.s', \n", " 'ENV.temperature.K', \n", " 'ENV.pressure.Pa', \n", " 'ENV.air number density.mol m-3', \n", " 'CONC.A.mol m-3', \n", " 'CONC.B.mol m-3', \n", " 'CONC.C.mol m-3'\n", "]]\n", "\n", "display(df_expanded)\n" ] }, { "cell_type": "markdown", "id": "8d11d8ea", "metadata": {}, "source": [ "Also as in previous tutorials, the results here are visualized with Seaborn to include a 95% confidence interval (CI) resulting from all grid cell results. Note here that due to this tutorial notebook having 10x the number of total grid cells along with the random nature of Latin Hypercube Sampling, the CI has started to converge." ] }, { "cell_type": "code", "execution_count": 91, "id": "78525f6b-0f4a-4a2e-825c-6c230930e8f3", "metadata": {}, "outputs": [ { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "sns.lineplot(data=df_expanded, x='time.s', y='CONC.A.mol m-3', errorbar=('ci', 95), err_kws={'alpha' : 0.4}, label='CONC.A.mol m-3')\n", "sns.lineplot(data=df_expanded, x='time.s', y='CONC.B.mol m-3', errorbar=('ci', 95), err_kws={'alpha' : 0.4}, label='CONC.B.mol m-3')\n", "sns.lineplot(data=df_expanded, x='time.s', y='CONC.C.mol m-3', errorbar=('ci', 95), err_kws={'alpha' : 0.4}, label='CONC.C.mol m-3')\n", "plt.title('Average concentration with CI over time')\n", "plt.ylabel('Concentration (mol m-3)')\n", "plt.xlabel('Time (s)')\n", "plt.legend(loc='center right')\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "30d81cbd", "metadata": {}, "source": [ "## 7. Scaling Tests on NCAR's Casper HPC System" ] }, { "cell_type": "markdown", "id": "33bed051", "metadata": {}, "source": [ "As a reference, the following visualizations use the dask_scaling_tests.csv located in this tutorials folder. These results summarize the scaling tests done on 10,000 and 100,000 grid cell calculations on NCAR's Casper HPC system with 1-4 workers and batch sizes of 1-1,000 each." ] }, { "cell_type": "markdown", "id": "964830df", "metadata": {}, "source": [ "Starting with the 10,000 grid cell system. It was found that the greates improvement in performance occurs with the use of at least 2 workers and at least some form of batching. Performance improvements tapered at about 4 workes and batch sizes beyond 100 grid cells per batch. It should be noted that, locally, this simulation takes on average 5 seconds to run. So it is not until the use of at least 3 workers with batching that we regain this performance. The initial slowdown reflects Dask’s overhead, and we emphasize that parallelization should not replace efficient local runs when possible. We use this information to guide the higher expense 100,000 grid cell tests that follow." ] }, { "cell_type": "code", "execution_count": null, "id": "76315c0b-a198-4e95-ae6f-4a3e8bf609a7", "metadata": {}, "outputs": [ { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Load the CSV (replace 'your_file.csv' with your actual filename)\n", "scaling_df = pd.read_csv('config/dask_scaling_tests.csv')\n", "\n", "# Filter for grid_cells == 10000\n", "df_10k = scaling_df[scaling_df['grid_cells'] == 10000]\n", "\n", "# Plot: batch_size on x, run_time (s) on y, grouped by workers\n", "plt.figure(figsize=(8,6))\n", "\n", "for workers, group in df_10k.groupby('workers'):\n", " plt.plot(group['batch_size'], group['run_time (s)'], marker='o', label=f'{workers} workers')\n", "\n", "plt.xlabel('Batch Size')\n", "plt.ylabel('Run Time (s)')\n", "plt.title('Performance for 10,000 Grid Cells')\n", "plt.legend(title='Workers')\n", "plt.grid(True)\n", "plt.xscale('log') # Batch size varies a lot, log scale helps visualize\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "111c8471", "metadata": {}, "source": [ "Based on the 10,000 grid cell tests, we skipped individual runs and started directly with batching (100 cells per batch) for the 100,000 grid cell system. Again, the biggest gains came from using at least 2 workers, with improvements tapering beyond ~4 workers. Batch sizes between 100–500 cells were similarly efficient. Locally, this calculation takes on average 1 minute to run, so the use of Dask provides ~35% improvement in run time. We expect even greater benefits at larger scales, and users can apply these scaling results to optimize resource use on their own systems." ] }, { "cell_type": "code", "execution_count": 8, "id": "4bf6e81a", "metadata": {}, "outputs": [ { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# Filter for grid_cells == 10000\n", "df_100k = scaling_df[scaling_df['grid_cells'] == 100000]\n", "\n", "# Plot: batch_size on x, run_time (s) on y, grouped by workers\n", "plt.figure(figsize=(8,6))\n", "\n", "for workers, group in df_100k.groupby('workers'):\n", " plt.plot(group['batch_size'], group['run_time (s)'], marker='o', label=f'{workers} workers')\n", "\n", "plt.xlabel('Batch Size')\n", "plt.ylabel('Run Time (s)')\n", "plt.title('Performance for 100,000 Grid Cells')\n", "plt.legend(title='Workers')\n", "plt.grid(True)\n", "#plt.xscale('log') # Batch size varies a lot, log scale helps visualize\n", "plt.show()" ] } ], "metadata": { "kernelspec": { "display_name": "mb-dask", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.13" } }, "nbformat": 4, "nbformat_minor": 5 }